跳到主要内容

最佳实践

玩转 Apache Doris 分区分桶

https://zhuanlan.zhihu.com/p/605213490

建立普通去重表

CREATE TABLE IF NOT EXISTS xxx
(
`xx` string NOT NULL COMMENT "x",
`x` string COMMENT "x",
`x` string COMMENT "x"
)
DUPLICATE KEY(`xx`)
DISTRIBUTED BY HASH(`xx`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

查看表副本情况

ADMIN SHOW REPLICA STATUS FROM xxx;

权限问题

创建用户

#Doris创建用户。
CREATE USER 'test' IDENTIFIED BY 'test';
grant ALL on 数据库.* to test@'%';

#ProxySQL也需要添加用户,如果没有使用ProxySQL,那么这个操作就不需要。
insert into mysql_users(username,password,default_hostgroup) values('test','test',10);

相关链接

创建用户

https://doris.apache.org/zh-CN/docs/1.2/data-table/basic-usage?_highlight=%E5%88%9B%E5%BB%BA&_highlight=%E7%94%A8%E6%88%B7#%E5%88%9B%E5%BB%BA%E7%94%A8%E6%88%B7

授权

https://doris.apache.org/zh-CN/docs/1.2/sql-manual/sql-reference/Account-Management-Statements/GRANT

ProxySQL操作

https://hf200012.github.io/2021/09/Apache-doris-FE%E4%BD%BF%E7%94%A8ProxySQL%E5%AE%9E%E7%8E%B0%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1/

最佳实践

https://selectdb.feishu.cn/docx/doxcnm0uTBWFTc4Qn9A1WHuqrcg

MySQL表映射到Doris建表语句脚本

安装依赖

pip install pymysql

代码

import mysql.connector

# MySQL数据库连接信息
mysql_host = "数据库密码"
mysql_port = 3306
mysql_user = "用户名"
mysql_password = "密码"
mysql_database = "数据库"
mysql_table = "表名"
# 生成doris前缀名称
doris_prefix = "数据库名称.ods_[数据来源|mysql|oracle|reptile]_主题域"


# 自定义字段类型映射关系
type_mapping = {
"varchar": "STRING",
"char": "STRING",
"text": "STRING",
"tinytext": "STRING",
"mediumtext": "STRING",
"longtext": "STRING",
"int": "BIGINT",
"tinyint": "BIGINT",
"smallint": "BIGINT",
"mediumint": "BIGINT",
"bigint": "BIGINT",
"float": "DECIMAL(27,6)",
"double": "DECIMAL(27,6)",
"decimal": "DECIMAL(27,6)",
"date": "STRING",
"datetime": "STRING",
"timestamp": "STRING",
# 添加更多映射关系
}

# 连接MySQL数据库
conn = mysql.connector.connect(
host=mysql_host,
user=mysql_user,
password=mysql_password,
port=mysql_port,
database=mysql_database
)

cursor = conn.cursor()

# 查询MySQL表结构
query_column_info = f"SELECT COLUMN_NAME, COLUMN_COMMENT,DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{mysql_table}'"
query_table_info = f"SELECT TABLE_COMMENT FROM information_schema.tables WHERE TABLE_NAME = '{mysql_table}'"
cursor.execute(query_column_info)
table_structure = cursor.fetchall()
cursor.execute(query_table_info)
table_info = cursor.fetchall()

# 关闭MySQL连接
cursor.close()
conn.close()

# 生成Doris建表语句
doris_table_name = f"{doris_prefix}_{mysql_table}"
doris_columns = []
select_columns = []

table_info_comment = table_info[0][0]
table_pro=f'''
ENGINE=OLAP
DUPLICATE KEY(`ts`)
COMMENT '{table_info_comment}'
PARTITION BY RANGE(`ts`)()
DISTRIBUTED BY HASH(ts) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "4",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 3",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.history_partition_num" = "47",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"dynamic_partition.storage_medium" = "HDD",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
'''


for column_info in table_structure:
column_name,column_comment, column_type = column_info
mysql_column_type = column_type # 去除类型参数
doris_column_type = type_mapping.get(mysql_column_type.lower()) # 使用映射关系或者默认字段类型
doris_column = f"`{column_name}` {doris_column_type} COMMENT '{column_comment}'"
doris_columns.append(doris_column)

#得到对应需要的select字段
select_column = f"{column_name}"
select_columns.append(select_column)

doris_create_table_sql = f"CREATE TABLE IF NOT EXISTS {doris_table_name} (\n"
doris_create_table_sql += ",\n".join(doris_columns)
doris_create_table_sql += "\n)"+table_pro

select_column_info=",\n".join(select_columns)

# 打印生成的Doris建表语句
print("Doris建表语句: ")
print("===========================================")
print(doris_create_table_sql)
print("Doris表字段为: ")
print("===========================================")
print(select_column_info)

效果

===========================================
CREATE TABLE IF NOT EXISTS 数据库名称.表名 (
`item1` STRING COMMENT '字段一',
`item2` STRING COMMENT '字段二',
`item3` STRING COMMENT '字段三',
)
ENGINE=OLAP
DUPLICATE KEY(`item1`)
COMMENT ''
DISTRIBUTED BY HASH(item1) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
);

Doris表字段为:
===========================================
item1,
item2,
item3

Oracle表映射到Doris建表语句脚本

安装依赖

python -m pip install oracledb

代码

import oracledb

# python -m pip install oracledb

# 设置Oracle数据库连接信息
oracle_host = 'ip'
oracle_port = 1521
oracle_sid = 'sid'
oracle_user = 'user'
oracle_password = 'passwd'
oracle_table='tablename'
oracle_database="数据库名称"
doris_prefix = "数据库名称.ods_[数据来源|mysql|oracle|reptile]_主题域"

oracle_to_doris_type_mapping = {
"VARCHAR2": "STRING",
"CHAR": "STRING",
"NCHAR": "STRING",
"NVARCHAR2": "STRING",
"CLOB": "STRING",
"NCLOB": "STRING",
"NUMBER": "DECIMAL(27,6)",
"BINARY_FLOAT": "FLOAT",
"BINARY_DOUBLE": "DOUBLE",
"DATE": "STRING",
"TIMESTAMP": "STRING",
# 添加更多映射关系
}

table_pro=f'''
ENGINE=OLAP
DUPLICATE KEY(`ts`)
COMMENT ''
PARTITION BY RANGE(`ts`)()
DISTRIBUTED BY HASH(ts) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "4",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 3",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.history_partition_num" = "47",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"dynamic_partition.storage_medium" = "HDD",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
'''


# 创建 Oracle 数据库连接
connection = oracledb.connect(
user=oracle_user,
password=oracle_password,
dsn=f"(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST={oracle_host})(PORT={oracle_port}))(CONNECT_DATA=(SERVICE_NAME={oracle_sid})))"
)

# 创建游标
cursor = connection.cursor()

query_str=f'''select b.COLUMN_NAME as fieldName,b.DATA_TYPE as dataType,
CASE WHEN b.DATA_TYPE IN ('NUMBER','FLOAT') THEN b.DATA_PRECISION ELSE b.CHAR_LENGTH END as dataPrecision ,
b.DATA_SCALE as dataScale, a.COMMENTS as fieldDesc from ALL_TAB_COLUMNS b,ALL_COL_COMMENTS a
where b.TABLE_NAME = '{oracle_table}' and b.OWNER = '{oracle_database}'
and a.OWNER = '{oracle_database}' and
b.TABLE_NAME = a.TABLE_NAME and b.COLUMN_NAME = a.COLUMN_NAME order by b.COLUMN_ID'''
# 执行 SQL 查询
cursor.execute(query_str)

# 获取查询结果
table_structure = cursor.fetchall()

# 关闭游标和连接
cursor.close()
connection.close()

doris_columns = []
select_columns = []

for column_info in table_structure:
column_name, column_type,_,_,column_comment = column_info
mysql_column_type = column_type # 去除类型参数
doris_column_type = oracle_to_doris_type_mapping.get(mysql_column_type) # 使用映射关系或者默认字段类型
doris_column = f"`{column_name.lower()}` {doris_column_type} COMMENT '{column_comment}'"
doris_columns.append(doris_column)

#得到对应需要的select字段
select_column = f"{column_name.lower()}"
select_columns.append(select_column)

doris_create_table_sql = f"CREATE TABLE IF NOT EXISTS {doris_prefix}{oracle_table} (\n"
doris_create_table_sql += ",\n".join(doris_columns)
doris_create_table_sql += "\n)"+table_pro

select_column_info=",\n".join(select_columns)

# 打印生成的Doris建表语句
print("Doris建表语句: ")
print("===========================================")
print(doris_create_table_sql)
print("Doris表字段为: ")
print("===========================================")
print(select_column_info)